package com.gl.sass.mq.listener;

import lombok.Getter;

import java.util.Optional;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import com.gl.sass.mq.consumer.BaseConsumer;
import com.gl.sass.mq.properties.ConsumerProperties;

/**
 * 
 * @author xiehong
 *
 */
@Getter
public class ConsumerListener {

    private ConsumerProperties consumerProperties;

    private BaseConsumer baseConsumer;

    private DefaultMQPushConsumer defaultMQPushConsumer;

    public ConsumerListener(ConsumerProperties consumerProperties, BaseConsumer baseConsumer) {
        this.consumerProperties = consumerProperties;
        this.baseConsumer = baseConsumer;
    }

    public void start() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.getConsumerProperties().getGroupName());
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setConsumeMessageBatchMaxSize(this.getConsumerProperties().getConsumerMessageBatchMaxSize());
        consumer.setConsumeThreadMin(this.getConsumerProperties().getThreadMin());
        consumer.setConsumeThreadMax(this.getConsumerProperties().getThreadMax());
        consumer.setPullBatchSize(this.getConsumerProperties().getPullBatchSize());
        consumer.setNamesrvAddr(this.getConsumerProperties().getNamesrvAddr());
        consumer.subscribe(this.getConsumerProperties().getTopic(), this.getConsumerProperties().getTag());
        consumer.registerMessageListener(baseConsumer);
        consumer.start();
        this.defaultMQPushConsumer = consumer;
    }

    public void destory() {
        Optional.ofNullable(defaultMQPushConsumer).ifPresent(DefaultMQPushConsumer::shutdown);
    }
}
